PARCOMPUTE = TRUE
N_CORE = parallel::detectCores()
In this notebook, we repeat the analysis of 02_temporal_heterogeneity.Rmd for all of our core indicators.
download_delayed_data = function(source_name,
signal_name,
start_day,
end_day,
geo_level,
n_delay) {
iterdays = 0:(end_day-start_day)
if (!PARCOMPUTE) {
lfunc = lapply
} else {
lfunc = function(x, f) { parallel::mclapply(x, f, mc.cores = N_CORE) }
}
bind_rows(lfunc(iterdays, function(dt) {
suppressWarnings(
covidcast_signal(source_name,
signal_name,
start_day+dt,
start_day+dt,
geo_type=geo_level,
as_of=start_day+dt+n_delay))
}))
}
# Fetch the following sources and signals from the API
# TODO: Add Google Symptoms "eventually"
source_names = c("doctor-visits", "fb-survey", "fb-survey", "hospital-admissions")
signal_names = c("smoothed_adj_cli", "smoothed_cli", "smoothed_hh_cmnty_cli",
"smoothed_adj_covid19")
pretty_names = c("Doctor visits", "Facebook CLI", "Facebook CLI-in-community",
"Hospitalizations")
target_names = c("Cases", "Cases", "Cases", "Deaths")
geo_level = "county"
start_day = lubridate::ymd("2020-04-15")
end_day = lubridate::today()
n_delays = 1:14
for (n_delay in n_delays) {
message(n_delay)
cache_fname = sprintf('cached_data/05_delayed_indicators_d%02d.RDS',
n_delay)
if (!file.exists(cache_fname)) {
df_signals = vector("list", length(signal_names))
for (ind_idx in 1:length(signal_names)) {
df_signals[[ind_idx]] = download_delayed_data(source_names[ind_idx],
signal_names[ind_idx],
start_day,
end_day,
geo_level,
n_delay)
}
# Fetch USAFacts confirmed case incidence proportion (smoothed with 7-day
# trailing average)
df_cases = download_delayed_data("usa-facts",
"confirmed_7dav_incidence_prop",
start_day,
end_day,
geo_level,
n_delay)
df_deaths = download_delayed_data("usa-facts",
"deaths_7dav_incidence_prop",
start_day,
end_day,
geo_level,
n_delay)
saveRDS(list(df_signals, df_cases, df_deaths), cache_fname)
} else {
cached_data = readRDS(cache_fname)
df_signals = cached_data[[1]]
df_cases = cached_data[[2]]
df_deaths = cached_data[[3]]
}
}
## 1
## 2
## 3
## 4
## 5
## 6
## 7
## 8
## 9
## 10
## 11
## 12
## 13
## 14
# TODO: Understand the distribution of delay in the different dataframes
n0 = 5
cache_fname = sprintf('cached_data/05_delayed_indicators_d%02d.RDS',
n0)
d0 = readRDS(cache_fname)
n1 = 7
cache_fname = sprintf('cached_data/05_delayed_indicators_d%02d.RDS',
n1)
d1 = readRDS(cache_fname)
n2 = 12
cache_fname = sprintf('cached_data/05_delayed_indicators_d%02d.RDS',
n2)
d2 = readRDS(cache_fname)
d0[[1]][[2]] %>% group_by(lag) %>% summarise(count=n())
## `summarise()` ungrouping output (override with `.groups` argument)
## # A tibble: 5 x 2
## lag count
## <int> <int>
## 1 1 531
## 2 2 38158
## 3 3 35804
## 4 4 23787
## 5 5 61406
d1[[1]][[2]] %>% group_by(lag) %>% summarise(count=n())
## `summarise()` ungrouping output (override with `.groups` argument)
## # A tibble: 7 x 2
## lag count
## <int> <int>
## 1 1 531
## 2 2 37480
## 3 3 27303
## 4 4 15963
## 5 5 56074
## 6 6 12608
## 7 7 11494
d2[[1]][[2]] %>% group_by(lag) %>% summarise(count=n())
## `summarise()` ungrouping output (override with `.groups` argument)
## # A tibble: 12 x 2
## lag count
## <int> <int>
## 1 1 531
## 2 2 33920
## 3 3 23321
## 4 4 11518
## 5 5 50514
## 6 6 9636
## 7 7 8740
## 8 8 6968
## 9 9 6094
## 10 10 5050
## 11 11 5006
## 12 12 4958
d0[[1]][[2]] %>% filter(lag==2) %>% tibble
## # A tibble: 38,158 x 9
## data_source signal geo_value time_value issue lag value stderr
## <chr> <chr> <chr> <date> <date> <int> <dbl> <dbl>
## 1 fb-survey smoot… 01000 2020-07-14 2020-07-16 2 1.17 0.138
## 2 fb-survey smoot… 01001 2020-07-14 2020-07-16 2 1.27 0.793
## 3 fb-survey smoot… 01003 2020-07-14 2020-07-16 2 2.06 0.502
## 4 fb-survey smoot… 01015 2020-07-14 2020-07-16 2 0.678 0.483
## 5 fb-survey smoot… 01031 2020-07-14 2020-07-16 2 0.844 0.426
## 6 fb-survey smoot… 01043 2020-07-14 2020-07-16 2 1.48 0.757
## 7 fb-survey smoot… 01045 2020-07-14 2020-07-16 2 1.37 0.570
## 8 fb-survey smoot… 01051 2020-07-14 2020-07-16 2 0.653 0.456
## 9 fb-survey smoot… 01069 2020-07-14 2020-07-16 2 1.62 0.647
## 10 fb-survey smoot… 01071 2020-07-14 2020-07-16 2 1.76 0.901
## # … with 38,148 more rows, and 1 more variable: sample_size <dbl>
d0[[1]][[2]] %>% filter(lag==2) %>% head
## data_source signal geo_value time_value issue lag value
## 1 fb-survey smoothed_cli 01000 2020-07-14 2020-07-16 2 1.1690438
## 2 fb-survey smoothed_cli 01001 2020-07-14 2020-07-16 2 1.2700535
## 3 fb-survey smoothed_cli 01003 2020-07-14 2020-07-16 2 2.0625811
## 4 fb-survey smoothed_cli 01015 2020-07-14 2020-07-16 2 0.6782946
## 5 fb-survey smoothed_cli 01031 2020-07-14 2020-07-16 2 0.8438819
## 6 fb-survey smoothed_cli 01043 2020-07-14 2020-07-16 2 1.4840183
## stderr sample_size
## 1 0.1383003 1752.7482
## 2 0.7932612 143.9234
## 3 0.5020289 595.1874
## 4 0.4833256 145.6247
## 5 0.4255643 106.9909
## 6 0.7568023 107.2948
Recall the following terminology:
as_of parameter of the API gives you data for time \(t\) "as of" another time \(t+\delta\)issue column in the data specifies on which date a row was uploaded into the API. In the best case, issue is time \(t+1\), i.e., we get data immediately (the day after).lag column is simply issue - as_of.The way we created this dataset is by placing an upper bound on lag. As we reduce this upper bound, we look at data that is more and more "contemporaneous" to the date that we are interested in. This is important because some data sources will issue data for the same day multiple times, updating the data at each time (backfill). By controlling this upper bound, we look at how our sensorization approach performs for different levels of data quality.
# TODO: investigate the amount of data available for each delay. If
# there is pathologically little data available for some delays,
# then we may want to exclude it entirely.
for (delay in n_delays) {
cache_fname = sprintf('cached_data/05_delayed_indicators_d%02d.RDS',
delay)
df_signals = readRDS(cache_fname)
cat(sprintf('delay=%d\n', delay))
for (ind_idx in 1:length(signal_names)) {
cat(sprintf('%s %s: %d\n',
source_names[ind_idx],
signal_names[ind_idx],
nrow(df_signals[[1]][[ind_idx]])))
}
cat(sprintf('Cases: %d\n', nrow(df_signals[[2]])))
cat(sprintf('Deaths: %d\n', nrow(df_signals[[3]])))
cat('\n')
}
## delay=1
## doctor-visits smoothed_adj_cli: 947
## fb-survey smoothed_cli: 79502
## fb-survey smoothed_hh_cmnty_cli: 77630
## hospital-admissions smoothed_adj_covid19: 0
## Cases: 298490
## Deaths: 295348
##
## delay=2
## doctor-visits smoothed_adj_cli: 2965
## fb-survey smoothed_cli: 152159
## fb-survey smoothed_hh_cmnty_cli: 131868
## hospital-admissions smoothed_adj_covid19: 0
## Cases: 389608
## Deaths: 386466
##
## delay=3
## doctor-visits smoothed_adj_cli: 158953
## fb-survey smoothed_cli: 157788
## fb-survey smoothed_hh_cmnty_cli: 135572
## hospital-admissions smoothed_adj_covid19: 22087
## Cases: 405318
## Deaths: 405318
##
## delay=4
## doctor-visits smoothed_adj_cli: 270004
## fb-survey smoothed_cli: 158761
## fb-survey smoothed_hh_cmnty_cli: 136651
## hospital-admissions smoothed_adj_covid19: 32597
## Cases: 417886
## Deaths: 417886
##
## delay=5
## doctor-visits smoothed_adj_cli: 299238
## fb-survey smoothed_cli: 159686
## fb-survey smoothed_hh_cmnty_cli: 137714
## hospital-admissions smoothed_adj_covid19: 36663
## Cases: 430454
## Deaths: 430454
##
## delay=6
## doctor-visits smoothed_adj_cli: 322337
## fb-survey smoothed_cli: 160566
## fb-survey smoothed_hh_cmnty_cli: 138677
## hospital-admissions smoothed_adj_covid19: 40054
## Cases: 430454
## Deaths: 430454
##
## delay=7
## doctor-visits smoothed_adj_cli: 337736
## fb-survey smoothed_cli: 161453
## fb-survey smoothed_hh_cmnty_cli: 139600
## hospital-admissions smoothed_adj_covid19: 42706
## Cases: 433596
## Deaths: 433596
##
## delay=8
## doctor-visits smoothed_adj_cli: 349926
## fb-survey smoothed_cli: 162346
## fb-survey smoothed_hh_cmnty_cli: 140572
## hospital-admissions smoothed_adj_covid19: 44871
## Cases: 433596
## Deaths: 433596
##
## delay=9
## doctor-visits smoothed_adj_cli: 359713
## fb-survey smoothed_cli: 163324
## fb-survey smoothed_hh_cmnty_cli: 141578
## hospital-admissions smoothed_adj_covid19: 46528
## Cases: 439880
## Deaths: 439880
##
## delay=10
## doctor-visits smoothed_adj_cli: 368732
## fb-survey smoothed_cli: 164350
## fb-survey smoothed_hh_cmnty_cli: 142557
## hospital-admissions smoothed_adj_covid19: 48070
## Cases: 446164
## Deaths: 446164
##
## delay=11
## doctor-visits smoothed_adj_cli: 375727
## fb-survey smoothed_cli: 165318
## fb-survey smoothed_hh_cmnty_cli: 143539
## hospital-admissions smoothed_adj_covid19: 49528
## Cases: 452448
## Deaths: 452448
##
## delay=12
## doctor-visits smoothed_adj_cli: 382249
## fb-survey smoothed_cli: 166256
## fb-survey smoothed_hh_cmnty_cli: 144404
## hospital-admissions smoothed_adj_covid19: 50883
## Cases: 458732
## Deaths: 458732
##
## delay=13
## doctor-visits smoothed_adj_cli: 386567
## fb-survey smoothed_cli: 167277
## fb-survey smoothed_hh_cmnty_cli: 145480
## hospital-admissions smoothed_adj_covid19: 52243
## Cases: 461874
## Deaths: 461874
##
## delay=14
## doctor-visits smoothed_adj_cli: 390511
## fb-survey smoothed_cli: 168494
## fb-survey smoothed_hh_cmnty_cli: 146583
## hospital-admissions smoothed_adj_covid19: 53475
## Cases: 465016
## Deaths: 465016
# TODO: update cache filename to also index on the delay
# TODO: loop over different delays
sensorize_time_ranges = list(
c(-7, -1),
c(-10, -1),
c(-14, -1),
c(-21, -1))
# TODO: Add more "core indicators"
n_delays_fit = 3:14 # too little data on c(1, 2)
for (delay in n_delays_fit) {
cat('Delay=%d\n', delay)
cache_fname = sprintf('cached_data/05_delayed_indicators_d%02d.RDS',
delay)
cached_data = readRDS(cache_fname)
df_signals = cached_data[[1]]
df_cases = cached_data[[2]]
df_deaths = cached_data[[3]]
for (ind_idx in 1:length(source_names)) {
base_cor_fname = sprintf('results/05_base_cors_%s_%s_delay%02d.RDS',
source_names[ind_idx], signal_names[ind_idx],
delay)
sensorize_fname = sprintf('results/05_sensorize_cors_%s_%s_delay%02d.RDS',
source_names[ind_idx], signal_names[ind_idx],
delay)
sensorize_val_fname = sprintf('results/05_sensorize_vals_%s_%s_delay%02d.RDS',
source_names[ind_idx], signal_names[ind_idx],
delay)
if (target_names[ind_idx] == 'Cases') {
df_target = df_cases
} else if (target_names[ind_idx] == 'Deaths') {
df_target = df_deaths
} else {
stop(sprintf("No matching dataframe for target %s.", target_names[ind_idx]))
}
ind_df = tibble(df_signals[[ind_idx]])
ind_target = inner_join(ind_df, tibble(df_target),
by=c('geo_value', 'time_value')) %>% select (
geo_value=geo_value,
time_value=time_value,
indicator_value=value.x,
target_value=value.y,
)
ind_global_sensorized = ind_target %>% group_by (
geo_value,
) %>% group_modify ( ~ {
fit = lm(target_value ~ indicator_value, data =.x);
tibble(time_value=.x$time_value,
indicator_value=.x$indicator_value,
target_value=.x$target_value,
sensorized_value=fit$fitted.values)
}) %>% ungroup
df_global_sensorized = ind_global_sensorized %>% transmute (
geo_value=geo_value,
signal='ind_sensorized',
time_value=time_value,
direction=NA,
issue=lubridate::ymd('2020-11-01'),
lag=NA,
value=sensorized_value,
stderr=NA,
sample_size=NA,
data_source='linear_sensorization',
)
attributes(df_global_sensorized)$geo_type = 'county'
attributes(df_global_sensorized)$metadata$geo_type = 'county'
class(df_global_sensorized) = c("covidcast_signal", "data.frame")
if (!file.exists(base_cor_fname)) {
df_cor_base_ind = covidcast_cor(df_signals[[ind_idx]], df_target,
by='time_value', method='spearman')
df_cor_sensorized_ind = covidcast_cor(df_global_sensorized, df_target,
by='time_value', method='spearman')
df_cor_base = rbind(df_cor_base_ind, df_cor_sensorized_ind)
df_cor_base$Indicator = as.factor(c(rep(sprintf('Raw (Delay=%02d)',
delay),
nrow(df_cor_base_ind)),
rep(sprintf('Sensorized (Spatial, Delay=%02d)',
delay),
nrow(df_cor_sensorized_ind))))
saveRDS(df_cor_base, base_cor_fname)
} else {
df_cor_base = readRDS(base_cor_fname)
}
if (!file.exists(sensorize_fname)) {
sensorize_cors = vector('list', length(sensorize_time_ranges))
for (outer_idx in 1:length(sensorize_time_ranges)) {
sensorize_llim = sensorize_time_ranges[[outer_idx]][1]
sensorize_ulim = sensorize_time_ranges[[outer_idx]][2]
min_sensorize_date = lubridate::ymd(start_day) - sensorize_llim
max_sensorize_date = max(ind_target$time_value)
sensorize_date_offsets = 0:(max_sensorize_date-min_sensorize_date)
joiner_df_list = vector('list', length(sensorize_date_offsets))
for (idx in 1:length(sensorize_date_offsets)) {
dt = sensorize_date_offsets[idx]
sensorize_date = min_sensorize_date + dt
joiner_df_list[[idx]] = tibble(
sensorize_date = sensorize_date,
time_value = sensorize_date + sensorize_llim:sensorize_ulim)
}
joiner_df = bind_rows(joiner_df_list)
if (!PARCOMPUTE) {
ind_sensorized_lm = ind_target %>% inner_join(
joiner_df,
on='time_value',
) %>% group_by (
geo_value,
sensorize_date,
) %>% group_modify (
~ broom::tidy(lm(target_value ~ indicator_value, data = .x,
na.action=NULL))
) %>% ungroup
} else {
ind_grouped_list = ind_target %>% inner_join(
joiner_df,
on='time_value',
) %>% group_by (
geo_value,
sensorize_date,
) %>% group_split
ind_sensorized_lm = parallel::mclapply(ind_grouped_list, function(df) {
broom::tidy(
lm(target_value ~ indicator_value, data = df)
) %>% mutate (
geo_value = unique(df$geo_value),
sensorize_date = unique(df$sensorize_date),
)}, mc.cores = N_CORE) %>% bind_rows
}
ind_sensorized_wide = ind_sensorized_lm %>% select(
geo_value,
sensorize_date,
term,
estimate,
) %>% mutate (
term = sapply(term, function(x) {ifelse(x=='(Intercept)',
'intercept',
'slope')}),
) %>% pivot_wider (
id_cols = c(geo_value, sensorize_date),
names_from=term,
values_from=estimate,
)
ind_target_sensorized = ind_target %>% inner_join (
ind_sensorized_wide,
by=c('time_value'='sensorize_date',
'geo_value'),
) %>% mutate (
sensorized_value = intercept + indicator_value * slope,
)
df_sensorized = ind_target_sensorized %>% transmute (
geo_value=geo_value,
signal='ind_sensorized',
time_value=time_value,
direction=NA,
issue=lubridate::ymd('2020-11-01'),
lag=NA,
value=sensorized_value,
stderr=NA,
sample_size=NA,
data_source='linear_sensorization',
)
attributes(df_sensorized)$geo_type = 'county'
class(df_sensorized) = c("covidcast_signal", "data.frame")
df_cor_sensorized_ind = covidcast_cor(df_sensorized, df_target,
by='time_value', method='spearman')
df_cor_sensorized_ind$Indicator = sprintf('Sensorized (TS, %d:%d; Delay=%02d)',
sensorize_llim,
sensorize_ulim,
delay)
sensorize_cors[[outer_idx]] = df_cor_sensorized_ind
}
saveRDS(sensorize_cors, sensorize_fname)
saveRDS(ind_target_sensorized, sensorize_val_fname)
} else {
sensorize_cors = readRDS(sensorize_fname)
}
df_cor = bind_rows(df_cor_base, sensorize_cors)
df_cor$Indicator = factor(df_cor$Indicator,
levels=c(sprintf('Raw (Delay=%02d)', delay),
sprintf('Sensorized (Spatial, Delay=%02d)', delay),
sapply(sensorize_time_ranges,
function(x) {
sprintf('Sensorized (TS, %d:%d; Delay=%02d)',
x[[1]], x[[2]], delay)
})))
plt = ggplot(df_cor, aes(x = time_value, y = value)) +
geom_line(aes(color = Indicator)) +
labs(title = sprintf("Correlation between %s and %s",
pretty_names[ind_idx],
target_names[ind_idx]),
subtitle = sprintf("Per day; Delay=%02d", delay),
x = "Date", y = "Correlation") +
theme(legend.position = "bottom")
print(plt)
}
}
## Delay=%d
## 3
## Warning: Removed 189 row(s) containing missing values (geom_path).
## Warning: Removed 83 row(s) containing missing values (geom_path).
## Warning: Removed 62 row(s) containing missing values (geom_path).
## Warning: Removed 398 row(s) containing missing values (geom_path).
## Delay=%d
## 4
## Warning: Removed 94 row(s) containing missing values (geom_path).
## Warning: Removed 84 row(s) containing missing values (geom_path).
## Warning: Removed 63 row(s) containing missing values (geom_path).
## Warning: Removed 314 row(s) containing missing values (geom_path).
## Delay=%d
## 5
## Warning: Removed 94 row(s) containing missing values (geom_path).
## Warning: Removed 84 row(s) containing missing values (geom_path).
## Warning: Removed 63 row(s) containing missing values (geom_path).
## Warning: Removed 320 row(s) containing missing values (geom_path).
## Delay=%d
## 6
## Warning: Removed 95 row(s) containing missing values (geom_path).
## Warning: Removed 85 row(s) containing missing values (geom_path).
## Warning: Removed 64 row(s) containing missing values (geom_path).
## Warning: Removed 306 row(s) containing missing values (geom_path).
## Delay=%d
## 7
## Warning: Removed 96 row(s) containing missing values (geom_path).
## Warning: Removed 86 row(s) containing missing values (geom_path).
## Warning: Removed 65 row(s) containing missing values (geom_path).
## Warning: Removed 300 row(s) containing missing values (geom_path).
## Delay=%d
## 8
## Warning: Removed 97 row(s) containing missing values (geom_path).
## Warning: Removed 87 row(s) containing missing values (geom_path).
## Warning: Removed 66 row(s) containing missing values (geom_path).
## Warning: Removed 301 row(s) containing missing values (geom_path).
## Delay=%d
## 9
## Warning: Removed 94 row(s) containing missing values (geom_path).
## Warning: Removed 84 row(s) containing missing values (geom_path).
## Warning: Removed 63 row(s) containing missing values (geom_path).
## Warning: Removed 298 row(s) containing missing values (geom_path).
## Delay=%d
## 10
## Warning: Removed 85 row(s) containing missing values (geom_path).
## Warning: Removed 75 row(s) containing missing values (geom_path).
## Warning: Removed 54 row(s) containing missing values (geom_path).
## Warning: Removed 289 row(s) containing missing values (geom_path).
## Delay=%d
## 11
## Warning: Removed 85 row(s) containing missing values (geom_path).
## Warning: Removed 75 row(s) containing missing values (geom_path).
## Warning: Removed 54 row(s) containing missing values (geom_path).
## Warning: Removed 289 row(s) containing missing values (geom_path).
## Delay=%d
## 12
## Warning: Removed 81 row(s) containing missing values (geom_path).
## Warning: Removed 71 row(s) containing missing values (geom_path).
## Warning: Removed 50 row(s) containing missing values (geom_path).
## Warning: Removed 285 row(s) containing missing values (geom_path).
## Delay=%d
## 13
## Warning: Removed 81 row(s) containing missing values (geom_path).
## Warning: Removed 71 row(s) containing missing values (geom_path).
## Warning: Removed 50 row(s) containing missing values (geom_path).
## Warning: Removed 285 row(s) containing missing values (geom_path).
## Delay=%d
## 14
## Warning: Removed 81 row(s) containing missing values (geom_path).
## Warning: Removed 71 row(s) containing missing values (geom_path).
## Warning: Removed 50 row(s) containing missing values (geom_path).
## Warning: Removed 285 row(s) containing missing values (geom_path).